/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.tree.AstVisitor;
import com.facebook.presto.sql.tree.ColumnDefinition;
import com.facebook.presto.sql.tree.CreateTable;
import com.facebook.presto.sql.tree.Identifier;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.testing.assertions.Assert;
import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateProperties;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.iceberg.IcebergTableProperties.COMMIT_RETRIES;
import static com.facebook.presto.iceberg.IcebergTableProperties.DELETE_MODE;
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT;
import static com.facebook.presto.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN;
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
import static com.facebook.presto.iceberg.IcebergWarningCode.USE_OF_DEPRECATED_TABLE_PROPERTY;
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.METADATA_FOLDER_NAME;
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem;
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.resolveLatestMetadataLocation;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.nio.file.Files.createTempDirectory;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public abstract class IcebergDistributedSmokeTestBase
        extends AbstractTestIntegrationSmokeTest
{
    private final CatalogType catalogType;

    protected IcebergDistributedSmokeTestBase(CatalogType catalogType)
    {
        this.catalogType = requireNonNull(catalogType, "catalogType is null");
    }

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return IcebergQueryRunner.builder().setCatalogType(catalogType).build().getQueryRunner();
    }

    @Test
    public void testTimestamp()
    {
        assertUpdate("CREATE TABLE test_timestamp (x timestamp)");
        assertUpdate("INSERT INTO test_timestamp VALUES (timestamp '2017-05-01 10:12:34')", 1);
        assertQuery("SELECT * FROM test_timestamp", "SELECT CAST('2017-05-01 10:12:34' AS TIMESTAMP)");
        dropTable(getSession(), "test_timestamp");
    }

    @Test
    public void testTimestampWithTimeZone()
    {
        assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'");
        assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone");
        dropTable(getSession(), "test_timestamp_with_timezone");

        assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) WITH ( \"write.format.default\" = 'ORC') AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Unsupported Type: timestamp with time zone");
    }

    @Test
    public void testTime()
    {
        assertUpdate("CREATE TABLE test_time (x time)");
        assertUpdate("INSERT INTO test_time VALUES (time '10:12:34')", 1);
        assertQuery("SELECT * FROM test_time", "SELECT CAST('10:12:34' AS TIME)");
        dropTable(getSession(), "test_time");
    }

    @Test
    @Override
    public void testDescribeTable()
    {
        MaterializedResult expectedColumns = resultBuilder(getQueryRunner().getDefaultSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
                .row("orderkey", "bigint", "", "")
                .row("custkey", "bigint", "", "")
                .row("orderstatus", "varchar", "", "")
                .row("totalprice", "double", "", "")
                .row("orderdate", "date", "", "")
                .row("orderpriority", "varchar", "", "")
                .row("clerk", "varchar", "", "")
                .row("shippriority", "integer", "", "")
                .row("comment", "varchar", "", "")
                .build();
        MaterializedResult actualColumns = computeActual("DESCRIBE orders");
        Assert.assertEquals(actualColumns, expectedColumns);
    }

    @Test
    public void testShowCreateTable()
    {
        String schemaName = getSession().getSchema().get();

        validateShowCreateTable("orders",
                ImmutableList.of(
                        columnDefinition("orderkey", "bigint"),
                        columnDefinition("custkey", "bigint"),
                        columnDefinition("orderstatus", "varchar"),
                        columnDefinition("totalprice", "double"),
                        columnDefinition("orderdate", "date"),
                        columnDefinition("orderpriority", "varchar"),
                        columnDefinition("clerk", "varchar"),
                        columnDefinition("shippriority", "integer"),
                        columnDefinition("comment", "varchar")),
                getCustomizedTableProperties(ImmutableMap.of(
                        "location", "'" + getLocation(schemaName, "orders") + "'")));
    }

    @Test
    public void testTableWithSpecifiedWriteDataLocation()
            throws IOException
    {
        String tableName = "test_table_with_specified_write_data_location";
        String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString();
        try {
            assertUpdate(format("create table %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation));
            assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3);
            assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')");
            assertUpdate(format("delete from %s where a > 2", tableName), 1);
            assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')");
        }
        finally {
            try {
                getQueryRunner().execute("drop table if exists " + tableName);
            }
            catch (Exception e) {
                // ignored for hive catalog compatibility
            }
        }
    }

    @Test
    public void testPartitionedTableWithSpecifiedWriteDataLocation()
            throws IOException
    {
        String tableName = "test_partitioned_table_with_specified_write_data_location";
        String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString();
        try {
            assertUpdate(format("create table %s(a int, b varchar) with (partitioning = ARRAY['a'], \"write.data.path\" = '%s')", tableName, dataWriteLocation));
            assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3);
            assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')");
            assertUpdate(format("delete from %s where a > 2", tableName), 1);
            assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')");
        }
        finally {
            try {
                getQueryRunner().execute("drop table if exists " + tableName);
            }
            catch (Exception e) {
                // ignored for hive catalog compatibility
            }
        }
    }

    @Test
    public void testShowCreateTableWithSpecifiedWriteDataLocation()
            throws IOException
    {
        String tableName = "test_show_table_with_specified_write_data_location";
        String dataWriteLocation = createTempDirectory("test1").toAbsolutePath().toString();
        try {
            assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation));
            String schemaName = getSession().getSchema().get();
            String location = getLocation(schemaName, tableName);

            validateShowCreateTable(tableName,
                    ImmutableList.of(columnDefinition("a", "integer"), columnDefinition("b", "varchar")),
                    getCustomizedTableProperties(ImmutableMap.of(
                            "location", "'" + location + "'",
                            "write.data.path", "'" + dataWriteLocation + "'")));
        }
        finally {
            try {
                getQueryRunner().execute("DROP TABLE IF EXISTS " + tableName);
            }
            catch (Exception e) {
                // ignored for hive catalog compatibility
            }
        }
    }

    @Test
    public void testDecimal()
    {
        testWithAllFileFormats((session, format) -> testDecimalForFormat(session, format));
    }

    private void testDecimalForFormat(Session session, FileFormat format)
    {
        testDecimalWithPrecisionAndScale(session, format, 1, 0);
        testDecimalWithPrecisionAndScale(session, format, 8, 6);
        testDecimalWithPrecisionAndScale(session, format, 9, 8);
        testDecimalWithPrecisionAndScale(session, format, 10, 8);

        testDecimalWithPrecisionAndScale(session, format, 18, 1);
        testDecimalWithPrecisionAndScale(session, format, 18, 8);
        testDecimalWithPrecisionAndScale(session, format, 18, 17);

        testDecimalWithPrecisionAndScale(session, format, 17, 16);
        testDecimalWithPrecisionAndScale(session, format, 18, 17);
        testDecimalWithPrecisionAndScale(session, format, 24, 10);
        testDecimalWithPrecisionAndScale(session, format, 30, 10);
        testDecimalWithPrecisionAndScale(session, format, 37, 26);
        testDecimalWithPrecisionAndScale(session, format, 38, 37);

        testDecimalWithPrecisionAndScale(session, format, 38, 17);
        testDecimalWithPrecisionAndScale(session, format, 38, 37);
    }

    private void testDecimalWithPrecisionAndScale(Session session, FileFormat format, int precision, int scale)
    {
        checkArgument(precision >= 1 && precision <= 38, "Decimal precision (%s) must be between 1 and 38 inclusive", precision);
        checkArgument(scale < precision && scale >= 0, "Decimal scale (%s) must be less than the precision (%s) and non-negative", scale, precision);

        String tableName = format("test_decimal_p%d_s%d", precision, scale);
        String decimalType = format("DECIMAL(%d,%d)", precision, scale);
        String beforeTheDecimalPoint = "12345678901234567890123456789012345678".substring(0, precision - scale);
        String afterTheDecimalPoint = "09876543210987654321098765432109876543".substring(0, scale);
        String decimalValue = format("%s.%s", beforeTheDecimalPoint, afterTheDecimalPoint);

        assertUpdate(session, format("CREATE TABLE %s (x %s) WITH (\"write.format.default\" = '%s')", tableName, decimalType, format.name()));
        assertUpdate(session, format("INSERT INTO %s (x) VALUES (CAST('%s' AS %s))", tableName, decimalValue, decimalType), 1);
        assertQuery(session, format("SELECT * FROM %s", tableName), format("SELECT CAST('%s' AS %s)", decimalValue, decimalType));
        dropTable(session, tableName);
    }

    @Test
    public void testSimplifyPredicateOnPartitionedColumn()
    {
        try {
            assertUpdate("create table simplify_predicate_on_partition_column(a int) with (partitioning = ARRAY['a'])");
            String insertValues = range(0, 100)
                    .mapToObj(Integer::toString)
                    .collect(joining(", "));
            assertUpdate("insert into simplify_predicate_on_partition_column values " + insertValues, 100);

            String inValues = range(0, 50)
                    .map(i -> i * 2 + 1)
                    .mapToObj(Integer::toString)
                    .collect(joining(", "));
            String notInValues = range(0, 50)
                    .map(i -> i * 2)
                    .mapToObj(Integer::toString)
                    .collect(joining(", "));

            assertQuery("select * from simplify_predicate_on_partition_column where a in (" + inValues + ")", "values " + inValues);
            assertQuery("select * from simplify_predicate_on_partition_column where a not in (" + inValues + ")", "values " + notInValues);
        }
        finally {
            assertUpdate("drop table if exists simplify_predicate_on_partition_column");
        }
    }

    @Test
    public void testParquetPartitionByTimestamp()
    {
        // TODO
    }

    @Test
    public void testParquetSelectByTimestamp()
    {
        // TODO
    }

    @Test
    public void testOrcPartitionByTimestamp()
    {
        // TODO
    }

    @Test
    public void testOrcSelectByTimestamp()
    {
        // TODO
    }

    @Test
    public void testCreatePartitionedTable()
    {
        testWithAllFileFormats(this::testCreatePartitionedTable);
    }

    private void testCreatePartitionedTable(Session session, FileFormat fileFormat)
    {
        @Language("SQL") String createTable = "" +
                "CREATE TABLE test_partitioned_table (" +
                "  _string VARCHAR" +
                ", _bigint BIGINT" +
                ", _integer INTEGER" +
                ", _real REAL" +
                ", _double DOUBLE" +
                ", _boolean BOOLEAN" +
                ", _decimal_short DECIMAL(3,2)" +
                ", _decimal_long DECIMAL(30,10)" +
                ", _date DATE" +
                ") " +
                "WITH (" +
                "\"write.format.default\" = '" + fileFormat + "', " +
                "partitioning = ARRAY[" +
                "  '_string'," +
                "  '_integer'," +
                "  '_bigint'," +
                "  '_boolean'," +
                "  '_real'," +
                "  '_double'," +
                "  '_decimal_short', " +
                "  '_decimal_long'," +
                "  '_date']" +
                ")";

        assertUpdate(session, createTable);

        MaterializedResult result = computeActual("SELECT * from test_partitioned_table");
        assertEquals(result.getRowCount(), 0);

        @Language("SQL") String select = "" +
                "SELECT" +
                " 'foo' _string" +
                ", CAST(123 AS BIGINT) _bigint" +
                ", 456 _integer" +
                ", CAST('123.45' AS REAL) _real" +
                ", CAST('3.14' AS DOUBLE) _double" +
                ", true _boolean" +
                ", CAST('3.14' AS DECIMAL(3,2)) _decimal_short" +
                ", CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) _decimal_long" +
                ", CAST('2017-05-01' AS DATE) _date";

        assertUpdate(session, "INSERT INTO test_partitioned_table " + select, 1);
        assertQuery(session, "SELECT * from test_partitioned_table", select);
        assertQuery(session, "" +
                        "SELECT * FROM test_partitioned_table WHERE" +
                        " 'foo' = _string" +
                        " AND 456 = _integer" +
                        " AND CAST(123 AS BIGINT) = _bigint" +
                        " AND true = _boolean" +
                        " AND CAST('3.14' AS DECIMAL(3,2)) = _decimal_short" +
                        " AND CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) = _decimal_long" +
                        " AND CAST('2017-05-01' AS DATE) = _date",
                select);

        dropTable(session, "test_partitioned_table");
    }

    @Test
    public void testCreatePartitionedTableWithNestedTypes()
    {
        testWithAllFileFormats(this::testCreatePartitionedTableWithNestedTypes);
    }

    private void testCreatePartitionedTableWithNestedTypes(Session session, FileFormat fileFormat)
    {
        @Language("SQL") String createTable = "" +
                "CREATE TABLE test_partitioned_table_nested_type (" +
                "  _string VARCHAR" +
                ", _struct ROW(_field1 INT, _field2 VARCHAR)" +
                ", _date DATE" +
                ") " +
                "WITH (" +
                "\"write.format.default\" = '" + fileFormat + "', " +
                "partitioning = ARRAY['_date']" +
                ")";

        assertUpdate(session, createTable);

        dropTable(session, "test_partitioned_table_nested_type");
    }

    @Test
    public void testPartitionedTableWithNullValues()
    {
        testWithAllFileFormats(this::testPartitionedTableWithNullValues);
    }

    private void testPartitionedTableWithNullValues(Session session, FileFormat fileFormat)
    {
        @Language("SQL") String createTable = "" +
                "CREATE TABLE test_partitioned_table_with_null_values (" +
                "  _string VARCHAR" +
                ", _bigint BIGINT" +
                ", _integer INTEGER" +
                ", _real REAL" +
                ", _double DOUBLE" +
                ", _boolean BOOLEAN" +
                ", _decimal_short DECIMAL(3,2)" +
                ", _decimal_long DECIMAL(30,10)" +
                ", _date DATE" +
                ") " +
                "WITH (" +
                "\"write.format.default\" = '" + fileFormat + "', " +
                "partitioning = ARRAY[" +
                "  '_string'," +
                "  '_integer'," +
                "  '_bigint'," +
                "  '_boolean'," +
                "  '_real'," +
                "  '_double'," +
                "  '_decimal_short', " +
                "  '_decimal_long'," +
                "  '_date']" +
                ")";

        assertUpdate(session, createTable);

        MaterializedResult result = computeActual("SELECT * from test_partitioned_table_with_null_values");
        assertEquals(result.getRowCount(), 0);

        @Language("SQL") String select = "" +
                "SELECT" +
                " null _string" +
                ", null _bigint" +
                ", null _integer" +
                ", null _real" +
                ", null _double" +
                ", null _boolean" +
                ", null _decimal_short" +
                ", null _decimal_long" +
                ", null _date";

        assertUpdate(session, "INSERT INTO test_partitioned_table_with_null_values " + select, 1);
        assertQuery(session, "SELECT * from test_partitioned_table_with_null_values", select);
        dropTable(session, "test_partitioned_table_with_null_values");
    }

    @Test
    public void testCreatePartitionedTableAs()
    {
        testWithAllFileFormats(this::testCreatePartitionedTableAs);
    }

    protected void testCreatePartitionedTableAs(Session session, FileFormat fileFormat)
    {
        String tableName = "test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH);
        @Language("SQL") String createTable = "" +
                "CREATE TABLE " + tableName + " " +
                "WITH (" +
                "\"write.format.default\" = '" + fileFormat + "', " +
                "partitioning = ARRAY['ORDER_STATUS', 'Ship_Priority', 'Bucket(order_key,9)']" +
                ") " +
                "AS " +
                "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " +
                "FROM tpch.tiny.orders";

        assertUpdate(session, createTable, "SELECT count(*) from orders");

        validateShowCreateTable(tableName,
                ImmutableList.of(
                        columnDefinition("order_key", "bigint"),
                        columnDefinition("ship_priority", "integer"),
                        columnDefinition("order_status", "varchar")),
                getCustomizedTableProperties(ImmutableMap.of(
                        "write.format.default", "'" + fileFormat + "'",
                        "location", "'" + getLocation(getSession().getSchema().get(), tableName) + "'",
                        "partitioning", "ARRAY['order_status','ship_priority','bucket(order_key, 9)']")));

        assertQuery(session, "SELECT * from " + tableName, "SELECT orderkey, shippriority, orderstatus FROM orders");

        dropTable(session, tableName);
    }

    @Test
    public void testPartitionOnDecimalColumn()
    {
        testWithAllFileFormats(this::testPartitionedByShortDecimalType);
        testWithAllFileFormats(this::testPartitionedByLongDecimalType);
        testWithAllFileFormats(this::testTruncateShortDecimalTransform);
        testWithAllFileFormats(this::testTruncateLongDecimalTransform);
    }

    public void testPartitionedByShortDecimalType(Session session, FileFormat format)
    {
        // create iceberg table partitioned by column of ShortDecimalType, and insert some data
        assertUpdate(session, "drop table if exists test_partition_columns_short_decimal");
        assertUpdate(session, format("create table test_partition_columns_short_decimal(a bigint, b decimal(9, 2))" +
                " with (\"write.format.default\" = '%s', partitioning = ARRAY['b'])", format.name()));
        assertUpdate(session, "insert into test_partition_columns_short_decimal values(1, 12.31), (2, 133.28)", 2);
        assertQuery(session, "select * from test_partition_columns_short_decimal", "values(1, 12.31), (2, 133.28)");

        // validate column of ShortDecimalType exists in query filter
        assertQuery(session, "select * from test_partition_columns_short_decimal where b = 133.28", "values(2, 133.28)");
        assertQuery(session, "select * from test_partition_columns_short_decimal where b = 12.31", "values(1, 12.31)");

        // validate column of ShortDecimalType in system table "partitions"
        assertQuery(session, "select b, row_count from \"test_partition_columns_short_decimal$partitions\"", "values(12.31, 1), (133.28, 1)");

        // validate column of TimestampType exists in delete filter
        assertUpdate(session, "delete from test_partition_columns_short_decimal WHERE b = 12.31", 1);
        assertQuery(session, "select * from test_partition_columns_short_decimal", "values(2, 133.28)");
        assertQuery(session, "select * from test_partition_columns_short_decimal where b = 133.28", "values(2, 133.28)");

        assertQuery(session, "select b, row_count from \"test_partition_columns_short_decimal$partitions\"", "values(133.28, 1)");

        assertUpdate(session, "drop table test_partition_columns_short_decimal");
    }

    public void testPartitionedByLongDecimalType(Session session, FileFormat format)
    {
        // create iceberg table partitioned by column of ShortDecimalType, and insert some data
        assertUpdate(session, "drop table if exists test_partition_columns_long_decimal");
        assertUpdate(session, format("create table test_partition_columns_long_decimal(a bigint, b decimal(20, 2))" +
                " with (\"write.format.default\" = '%s', partitioning = ARRAY['b'])", format.name()));
        assertUpdate(session, "insert into test_partition_columns_long_decimal values(1, 11111111111111112.31), (2, 133.28)", 2);
        assertQuery(session, "select * from test_partition_columns_long_decimal", "values(1, 11111111111111112.31), (2, 133.28)");

        // validate column of ShortDecimalType exists in query filter
        assertQuery(session, "select * from test_partition_columns_long_decimal where b = 133.28", "values(2, 133.28)");
        assertQuery(session, "select * from test_partition_columns_long_decimal where b = 11111111111111112.31", "values(1, 11111111111111112.31)");

        // validate column of ShortDecimalType in system table "partitions"
        assertQuery(session, "select b, row_count from \"test_partition_columns_long_decimal$partitions\"",
                "values(11111111111111112.31, 1), (133.28, 1)");

        // validate column of TimestampType exists in delete filter
        assertUpdate(session, "delete from test_partition_columns_long_decimal WHERE b = 11111111111111112.31", 1);
        assertQuery(session, "select * from test_partition_columns_long_decimal", "values(2, 133.28)");
        assertQuery(session, "select * from test_partition_columns_long_decimal where b = 133.28", "values(2, 133.28)");

        assertQuery(session, "select b, row_count from \"test_partition_columns_long_decimal$partitions\"",
                "values(133.28, 1)");

        assertUpdate(session, "drop table test_partition_columns_long_decimal");
    }

    public void testTruncateShortDecimalTransform(Session session, FileFormat format)
    {
        assertUpdate(session, format("CREATE TABLE test_truncate_decimal_transform (d DECIMAL(9, 2), b BIGINT)" +
                " WITH (\"write.format.default\" = '%s', partitioning = ARRAY['truncate(d, 10)'])", format.name()));
        String select = "SELECT d_trunc, row_count, d.min, d.max FROM \"test_truncate_decimal_transform$partitions\"";

        assertUpdate(session, "INSERT INTO test_truncate_decimal_transform VALUES" +
                "(NULL, 101)," +
                "(12.34, 1)," +
                "(12.30, 2)," +
                "(12.29, 3)," +
                "(0.05, 4)," +
                "(-0.05, 5)", 6);

        assertQuery(session, "SELECT d_trunc FROM \"test_truncate_decimal_transform$partitions\"", "VALUES NULL, 12.30, 12.20, 0.00, -0.10");

        assertQuery(session, "SELECT b FROM test_truncate_decimal_transform WHERE d IN (12.34, 12.30)", "VALUES 1, 2");
        assertQuery(session, select + " WHERE d_trunc = 12.30",
                "VALUES (12.30, 2, 12.30, 12.34)");

        assertQuery(session, "SELECT b FROM test_truncate_decimal_transform WHERE d = 12.29", "VALUES 3");
        assertQuery(session, select + " WHERE d_trunc = 12.20",
                "VALUES (12.20, 1, 12.29, 12.29)");

        assertQuery(session, "SELECT b FROM test_truncate_decimal_transform WHERE d = 0.05", "VALUES 4");
        assertQuery(session, select + " WHERE d_trunc = 0.00",
                "VALUES (0.00, 1, 0.05, 0.05)");

        assertQuery(session, "SELECT b FROM test_truncate_decimal_transform WHERE d = -0.05", "VALUES 5");
        assertQuery(session, select + " WHERE d_trunc = -0.10",
                "VALUES (-0.10, 1, -0.05, -0.05)");

        // Exercise IcebergMetadata.applyFilter with non-empty Constraint.predicate, via non-pushdownable predicates
        assertQuery(session, "SELECT * FROM test_truncate_decimal_transform WHERE d * 100 % 10 = 9 AND b % 7 = 3",
                "VALUES (12.29, 3)");

        assertUpdate(session, "DROP TABLE test_truncate_decimal_transform");
    }

    public void testTruncateLongDecimalTransform(Session session, FileFormat format)
    {
        assertUpdate(session, format("CREATE TABLE test_truncate_long_decimal_transform (d DECIMAL(20, 2), b BIGINT)" +
                " WITH (\"write.format.default\" = '%s', partitioning = ARRAY['truncate(d, 10)'])", format.name()));
        String select = "SELECT d_trunc, row_count, d.min, d.max FROM \"test_truncate_long_decimal_transform$partitions\"";

        assertUpdate(session, "INSERT INTO test_truncate_long_decimal_transform VALUES" +
                "(NULL, 101)," +
                "(12.34, 1)," +
                "(12.30, 2)," +
                "(11111111111111112.29, 3)," +
                "(0.05, 4)," +
                "(-0.05, 5)", 6);

        assertQuery(session, "SELECT d_trunc FROM \"test_truncate_long_decimal_transform$partitions\"", "VALUES NULL, 12.30, 11111111111111112.20, 0.00, -0.10");

        assertQuery(session, "SELECT b FROM test_truncate_long_decimal_transform WHERE d IN (12.34, 12.30)", "VALUES 1, 2");
        assertQuery(session, select + " WHERE d_trunc = 12.30",
                "VALUES (12.30, 2, 12.30, 12.34)");

        assertQuery(session, "SELECT b FROM test_truncate_long_decimal_transform WHERE d = 11111111111111112.29", "VALUES 3");
        assertQuery(session, select + " WHERE d_trunc = 11111111111111112.20",
                "VALUES (11111111111111112.20, 1, 11111111111111112.29, 11111111111111112.29)");

        assertQuery(session, "SELECT b FROM test_truncate_long_decimal_transform WHERE d = 0.05", "VALUES 4");
        assertQuery(session, select + " WHERE d_trunc = 0.00",
                "VALUES (0.00, 1, 0.05, 0.05)");

        assertQuery(session, "SELECT b FROM test_truncate_long_decimal_transform WHERE d = -0.05", "VALUES 5");
        assertQuery(session, select + " WHERE d_trunc = -0.10",
                "VALUES (-0.10, 1, -0.05, -0.05)");

        // Exercise IcebergMetadata.applyFilter with non-empty Constraint.predicate, via non-pushdownable predicates
        assertQuery(session, "SELECT * FROM test_truncate_long_decimal_transform WHERE d * 100 % 10 = 9 AND b % 7 = 3",
                "VALUES (11111111111111112.29, 3)");

        assertUpdate(session, "DROP TABLE test_truncate_long_decimal_transform");
    }

    @Test
    public void testColumnComments()
    {
        Session session = getSession();
        assertUpdate(session, "CREATE TABLE test_column_comments (_bigint BIGINT COMMENT 'test column comment')");

        assertQuery(session, "SHOW COLUMNS FROM test_column_comments",
                "VALUES ('_bigint', 'bigint', '', 'test column comment')");

        assertUpdate("ALTER TABLE test_column_comments ADD COLUMN _varchar VARCHAR COMMENT 'test new column comment'");
        assertQuery(
                "SHOW COLUMNS FROM test_column_comments",
                "VALUES ('_bigint', 'bigint', '', 'test column comment'), ('_varchar', 'varchar', '', 'test new column comment')");

        dropTable(session, "test_column_comments");
    }

    @Test
    public void testTableComments()
    {
        Session session = getSession();
        String schemaName = session.getSchema().get();

        @Language("SQL") String createTable = "" +
                "CREATE TABLE iceberg.%s.test_table_comments (\n" +
                "   \"_x\" bigint\n" +
                ")\n" +
                "COMMENT '%s'\n" +
                "WITH (\n" +
                "   \"write.format.default\" = 'ORC',\n" +
                "   \"format-version\" = '2'\n" +
                ")";

        assertUpdate(format(createTable, schemaName, "test table comment"));

        validateShowCreateTable("iceberg", schemaName, "test_table_comments",
                ImmutableList.of(columnDefinition("_x", "bigint")),
                "test table comment",
                getCustomizedTableProperties(ImmutableMap.of(
                        "write.format.default", "'ORC'",
                        "location", "'" + getLocation(schemaName, "test_table_comments") + "'")));

        dropTable(session, "test_table_comments");
    }

    @Test
    public void testRollbackSnapshot()
    {
        Session session = getSession();
        assertUpdate(session, "CREATE TABLE test_rollback AS SELECT * FROM (VALUES (123, CAST(321 AS BIGINT))) AS t (col0, col1)", 1);
        long afterCreateTableId = getLatestSnapshotId();

        assertUpdate(session, "INSERT INTO test_rollback (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1);
        long afterFirstInsertId = getLatestSnapshotId();

        assertUpdate(session, "INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
        assertQuery(session, "SELECT * FROM test_rollback ORDER BY col0",
                "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT)), (123, CAST(321 AS BIGINT))");

        assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", session.getSchema().get(), afterFirstInsertId));
        assertQuery(session, "SELECT * FROM test_rollback ORDER BY col0",
                "VALUES (123, CAST(987 AS BIGINT)), (123, CAST(321 AS BIGINT))");

        assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", session.getSchema().get(), afterCreateTableId));
        assertEquals((long) computeActual(session, "SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 1);

        dropTable(session, "test_rollback");
    }

    private long getLatestSnapshotId()
    {
        return (long) computeActual("SELECT snapshot_id FROM \"test_rollback$snapshots\" ORDER BY committed_at DESC LIMIT 1")
                .getOnlyValue();
    }

    @Test
    public void testInsertIntoNotNullColumn()
    {
        // TODO: To support non-null column. (NOT_NULL_COLUMN_CONSTRAINT)
    }

    @Test
    public void testSchemaEvolution()
    {
        // TODO: Support schema evolution for PARQUET. Schema evolution should be id based.
        testSchemaEvolution(getSession(), FileFormat.ORC);
    }

    private void testSchemaEvolution(Session session, FileFormat fileFormat)
    {
        assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_end (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (\"write.format.default\" = '" + fileFormat + "')");
        assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (0, 1, 2)", 1);
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, 2)");
        assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end DROP COLUMN col2");
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1)");
        assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end ADD COLUMN col2 INTEGER");
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL)");
        assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (3, 4, 5)", 1);
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL), (3, 4, 5)");
        dropTable(session, "test_schema_evolution_drop_end");

        assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_middle (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (\"write.format.default\" = '" + fileFormat + "')");
        assertUpdate(session, "INSERT INTO test_schema_evolution_drop_middle VALUES (0, 1, 2)", 1);
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 1, 2)");
        assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_middle DROP COLUMN col1");
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 2)");
        assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_middle ADD COLUMN col1 INTEGER");
        assertUpdate(session, "INSERT INTO test_schema_evolution_drop_middle VALUES (3, 4, 5)", 1);
        assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 2, NULL), (3, 4, 5)");
        dropTable(session, "test_schema_evolution_drop_middle");
    }

    @Test
    protected void testCreateTableLike()
    {
        Session session = getSession();
        String schemaName = session.getSchema().get();

        assertUpdate(session, "CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = 'PARQUET', partitioning = ARRAY['aDate'])");
        validatePropertiesForShowCreateTable(session.getCatalog().get(),
                "\"" + schemaName + "\"",
                "test_create_table_like_original",
                getCustomizedTableProperties(ImmutableMap.of(
                        "location", "'" + getLocation(schemaName, "test_create_table_like_original") + "'",
                        "partitioning", "ARRAY['adate']")));

        assertUpdate(session, "CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)");
        assertUpdate(session, "INSERT INTO test_create_table_like_copy0 (col1, aDate, col2) VALUES (1, CAST('1950-06-28' AS DATE), 3)", 1);
        assertQuery(session, "SELECT * from test_create_table_like_copy0", "VALUES(1, CAST('1950-06-28' AS DATE), 3)");
        dropTable(session, "test_create_table_like_copy0");

        assertUpdate(session, "CREATE TABLE test_create_table_like_copy1 (LIKE test_create_table_like_original)");

        validatePropertiesForShowCreateTable(session.getCatalog().get(),
                "\"" + schemaName + "\"",
                "test_create_table_like_copy1",
                getCustomizedTableProperties(ImmutableMap.of(
                                "location", "'" + getLocation(schemaName, "test_create_table_like_copy1") + "'")));
        dropTable(session, "test_create_table_like_copy1");

        assertUpdate(session, "CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)");
        validatePropertiesForShowCreateTable(session.getCatalog().get(),
                "\"" + schemaName + "\"",
                "test_create_table_like_copy2",
                getCustomizedTableProperties(ImmutableMap.of(
                        "location", "'" + getLocation(schemaName, "test_create_table_like_copy2") + "'")));
        dropTable(session, "test_create_table_like_copy2");

        if (!catalogType.equals(HADOOP)) {
            assertUpdate(session, "CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)");
            validatePropertiesForShowCreateTable(session.getCatalog().get(),
                    "\"" + schemaName + "\"",
                    "test_create_table_like_copy3",
                    getCustomizedTableProperties(ImmutableMap.of(
                            "location", "'" + getLocation(schemaName, "test_create_table_like_original") + "'",
                            "partitioning", "ARRAY['adate']")));
            dropTable(session, "test_create_table_like_copy3");

            assertUpdate(session, "CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = 'ORC')");
            validatePropertiesForShowCreateTable(session.getCatalog().get(),
                    "\"" + schemaName + "\"",
                    "test_create_table_like_copy4",
                    getCustomizedTableProperties(ImmutableMap.of(
                            "write.format.default", "'ORC'",
                            "location", "'" + getLocation(schemaName, "test_create_table_like_original") + "'",
                            "partitioning", "ARRAY['adate']")));
            dropTable(session, "test_create_table_like_copy4");
        }
        else {
            assertUpdate(session, "CREATE TABLE test_create_table_like_copy5 (LIKE test_create_table_like_original INCLUDING PROPERTIES)" +
                    " WITH (location = '', format = 'ORC')");
            validatePropertiesForShowCreateTable("test_create_table_like_copy5",
                    getCustomizedTableProperties(ImmutableMap.of(
                            "write.format.default", "'ORC'",
                            "location", "'" + getLocation(schemaName, "test_create_table_like_copy5") + "'",
                            "partitioning", "ARRAY['adate']")));
            dropTable(session, "test_create_table_like_copy5");

            assertQueryFails(session, "CREATE TABLE test_create_table_like_copy6 (LIKE test_create_table_like_original INCLUDING PROPERTIES)",
                    "Cannot set a custom location for a path-based table.*");
        }

        dropTable(session, "test_create_table_like_original");
    }

    @Test
    public void testCreateTableWithFormatVersion()
    {
        testWithAllFormatVersions(this::testCreateTableWithFormatVersion);
    }

    protected void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode)
    {
        @Language("SQL") String createTable = "" +
                "CREATE TABLE test_create_table_with_format_version_" + formatVersion + " " +
                "WITH (" +
                "\"write.format.default\" = 'PARQUET', " +
                "\"format-version\" = '" + formatVersion + "'" +
                ") " +
                "AS " +
                "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " +
                "FROM tpch.tiny.orders";

        Session session = getSession();
        assertUpdate(session, createTable, "SELECT count(*) from orders");

        validateShowCreateTable("test_create_table_with_format_version_" + formatVersion,
                ImmutableList.of(
                        columnDefinition("order_key", "bigint"),
                        columnDefinition("ship_priority", "integer"),
                        columnDefinition("order_status", "varchar")),
                getCustomizedTableProperties(ImmutableMap.of(
                        "write.delete.mode", "'" + defaultDeleteMode + "'",
                        "format-version", "'" + formatVersion + "'",
                        "location", "'" + getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion) + "'",
                        "write.update.mode", "'" + defaultDeleteMode + "'")));

        dropTable(session, "test_create_table_with_format_version_" + formatVersion);
    }

    private void testWithAllFormatVersions(BiConsumer<String, String> test)
    {
        test.accept("1", "copy-on-write");
        test.accept("2", "merge-on-read");
    }

    @Test
    public void testPredicating()
    {
        testWithAllFileFormats(this::testPredicating);
    }

    private void testPredicating(Session session, FileFormat fileFormat)
    {
        assertUpdate(session, "CREATE TABLE test_predicating_on_real (col REAL) WITH (\"write.format.default\" = '" + fileFormat + "')");
        assertUpdate(session, "INSERT INTO test_predicating_on_real VALUES 1.2", 1);
        assertQuery(session, "SELECT * FROM test_predicating_on_real WHERE col = 1.2", "VALUES 1.2");
        dropTable(session, "test_predicating_on_real");
    }

    @Test
    public void testDateTransforms()
    {
        // TODO
    }

    @Test
    public void testTruncateTransform()
    {
        testWithAllFileFormats(this::testTruncateTransformsForFormat);
    }

    private void testTruncateTransformsForFormat(Session session, FileFormat format)
    {
        String select = "SELECT d_trunc, row_count, d.min AS d_min, d.max AS d_max, b.min AS b_min, b.max AS b_max FROM \"test_truncate_transform$partitions\"";

        assertUpdate(session, format("CREATE TABLE test_truncate_transform (d VARCHAR, b BIGINT)" +
                " WITH (\"write.format.default\" = '%s', partitioning = ARRAY['truncate(d, 2)'])", format.name()));

        String insertSql = "INSERT INTO test_truncate_transform VALUES" +
                "('abcd', 1)," +
                "('abxy', 2)," +
                "('ab598', 3)," +
                "('mommy', 4)," +
                "('moscow', 5)," +
                "('Greece', 6)," +
                "('Grozny', 7)";
        assertUpdate(session, insertSql, 7);

        assertQuery(session, "SELECT COUNT(*) FROM \"test_truncate_transform$partitions\"", "SELECT 3");

        assertQuery(session, "SELECT b FROM test_truncate_transform WHERE substr(d, 1, 2) = 'ab'", "SELECT b FROM (VALUES (1), (2), (3)) AS t(b)");
        assertQuery(session, select + " WHERE d_trunc = 'ab'", "VALUES('ab', 3, 'ab598', 'abxy', 1, 3)");

        assertQuery(session, "SELECT b FROM test_truncate_transform WHERE substr(d, 1, 2) = 'mo'", "SELECT b FROM (VALUES (4), (5)) AS t(b)");
        assertQuery(session, select + " WHERE d_trunc = 'mo'", "VALUES('mo', 2, 'mommy', 'moscow', 4, 5)");

        assertQuery(session, "SELECT b FROM test_truncate_transform WHERE substr(d, 1, 2) = 'Gr'", "SELECT b FROM (VALUES (6), (7)) AS t(b)");
        assertQuery(session, select + " WHERE d_trunc = 'Gr'", "VALUES('Gr', 2, 'Greece', 'Grozny', 6, 7)");

        dropTable(session, "test_truncate_transform");
    }

    @Test
    public void testBucketTransform()
    {
        testWithAllFileFormats(this::testBucketTransformsForFormat);
    }

    private void testBucketTransformsForFormat(Session session, FileFormat format)
    {
        String select = "SELECT d_bucket, row_count, d.min AS d_min, d.max AS d_max, b.min AS b_min, b.max AS b_max FROM \"test_bucket_transform$partitions\"";

        assertUpdate(session, format("CREATE TABLE test_bucket_transform (d VARCHAR, b BIGINT)" +
                " WITH (\"write.format.default\" = '%s', partitioning = ARRAY['bucket(d, 2)'])", format.name()));
        String insertSql = "INSERT INTO test_bucket_transform VALUES" +
                "('abcd', 1)," +
                "('abxy', 2)," +
                "('ab598', 3)," +
                "('mommy', 4)," +
                "('moscow', 5)," +
                "('Greece', 6)," +
                "('Grozny', 7)";
        assertUpdate(session, insertSql, 7);

        assertQuery(session, "SELECT COUNT(*) FROM \"test_bucket_transform$partitions\"", "SELECT 2");

        assertQuery(session, select + " WHERE d_bucket = 0", "VALUES(0, 3, 'Grozny', 'mommy', 1, 7)");

        assertQuery(session, select + " WHERE d_bucket = 1", "VALUES(1, 4, 'Greece', 'moscow', 2, 6)");

        dropTable(session, "test_bucket_transform");
    }

    private void testWithAllFileFormats(BiConsumer<Session, FileFormat> test)
    {
        test.accept(getSession(), FileFormat.PARQUET);
        test.accept(getSession(), FileFormat.ORC);
    }

    protected void dropTable(Session session, String table)
    {
        assertUpdate(session, "DROP TABLE IF EXISTS " + table);
        assertFalse(getQueryRunner().tableExists(session, table));
    }

    protected void unregisterTable(String schemaName, String newTableName)
    {
        assertUpdate("CALL system.unregister_table('" + schemaName + "', '" + newTableName + "')");
    }

    @Test
    public void testCreateNestedPartitionedTable()
    {
        testWithAllFileFormats(this::testCreateNestedPartitionedTable);
    }

    public void testCreateNestedPartitionedTable(Session session, FileFormat fileFormat)
    {
        @Language("SQL") String createTable = "" +
                "CREATE TABLE test_nested_table (" +
                " bool BOOLEAN" +
                ", int INTEGER" +
                ", arr ARRAY(VARCHAR)" +
                ", big BIGINT" +
                ", rl REAL" +
                ", dbl DOUBLE" +
                ", mp MAP(INTEGER, VARCHAR)" +
                ", dec DECIMAL(5,2)" +
                ", vc VARCHAR" +
                ", vb VARBINARY" +
                ", str ROW(id INTEGER , vc VARCHAR)" +
                ", dt DATE)" +
                " WITH (partitioning = ARRAY['int']," +
                " \"write.format.default\" = '" + fileFormat + "'" +
                ")";

        assertUpdate(session, createTable);

        assertUpdate(session, "INSERT INTO test_nested_table " +
                " select true, 1, array['uno', 'dos', 'tres'], BIGINT '1', REAL '1.0', DOUBLE '1.0', map(array[1,2,3,4], array['ek','don','teen','char'])," +
                " CAST(1.0 as DECIMAL(5,2))," +
                " 'one', VARBINARY 'binary0/1values',\n" +
                " (CAST(ROW(null, 'this is a random value') AS ROW(int, varchar))), current_date", 1);
        MaterializedResult result = computeActual("SELECT * from test_nested_table");
        assertEquals(result.getRowCount(), 1);

        dropTable(session, "test_nested_table");

        @Language("SQL") String createTable2 = "" +
                "CREATE TABLE test_nested_table (" +
                " int INTEGER" +
                ", arr ARRAY(ROW(id INTEGER, vc VARCHAR))" +
                ", big BIGINT" +
                ", rl REAL" +
                ", dbl DOUBLE" +
                ", mp MAP(INTEGER, ARRAY(VARCHAR))" +
                ", dec DECIMAL(5,2)" +
                ", str ROW(id INTEGER, vc VARCHAR, arr ARRAY(INTEGER))" +
                ", vc VARCHAR)" +
                " WITH (partitioning = ARRAY['int']," +
                " \"write.format.default\" = '" + fileFormat + "'" +
                ")";

        assertUpdate(session, createTable2);

        assertUpdate(session, "INSERT INTO test_nested_table " +
                " select 1, array[cast(row(1, null) as row(int, varchar)), cast(row(2, 'dos') as row(int, varchar))], BIGINT '1', REAL '1.0', DOUBLE '1.0', " +
                "map(array[1,2], array[array['ek', 'one'], array['don', 'do', 'two']]), CAST(1.0 as DECIMAL(5,2)), " +
                "CAST(ROW(1, 'this is a random value', null) AS ROW(int, varchar, array(int))), 'one'", 1);
        result = computeActual("SELECT * from test_nested_table");
        assertEquals(result.getRowCount(), 1);

        @Language("SQL") String createTable3 = "" +
                "CREATE TABLE test_nested_table2 WITH (partitioning = ARRAY['int']) as select * from test_nested_table";

        assertUpdate(session, createTable3, 1);

        result = computeActual("SELECT * from test_nested_table2");
        assertEquals(result.getRowCount(), 1);

        dropTable(session, "test_nested_table");
        dropTable(session, "test_nested_table2");
    }

    @DataProvider(name = "testPartitionedByTimeProvider")
    public Object[][] testPartitionedByTimeProvider()
    {
        return new Object[][] {
                {false, FileFormat.PARQUET},
                {false, FileFormat.ORC},
                {true, FileFormat.PARQUET},
                {true, FileFormat.ORC}
        };
    }

    @Test(dataProvider = "testPartitionedByTimeProvider")
    private void testSelectOrPartitionedByTime(boolean partitioned, FileFormat format)
    {
        String tableName = format("test_%s_by_time", partitioned ? "partitioned" : "selected");
        try {
            String partitioning = partitioned ? ", partitioning = ARRAY['x']" : "";
            assertUpdate(format("CREATE TABLE %s (x TIME, y BIGINT) WITH (\"write.format.default\" = '%s'%s)", tableName, format, partitioning));
            assertUpdate(format("INSERT INTO %s VALUES (TIME '10:12:34', 12345)", tableName), 1);
            assertQuery(format("SELECT COUNT(*) FROM %s", tableName), "SELECT 1");
            assertQuery(format("SELECT x FROM %s", tableName), "SELECT CAST('10:12:34' AS TIME)");
            assertUpdate(format("INSERT INTO %s VALUES (TIME '9:00:00', 67890)", tableName), 1);
            assertQuery(format("SELECT COUNT(*) FROM %s", tableName), "SELECT 2");
            assertQuery(format("SELECT x FROM %s WHERE y = 12345", tableName), "SELECT CAST('10:12:34' AS TIME)");
            assertQuery(format("SELECT x FROM %s WHERE y = 67890", tableName), "SELECT CAST('9:00:00' AS TIME)");
            assertUpdate(format("INSERT INTO %s VALUES (TIME '10:12:34', 54321)", tableName), 1);
            assertQuery(
                    format("SELECT x, COUNT(*) FROM %s GROUP BY x ORDER BY x", tableName),
                    "SELECT CAST('9:00:00' AS TIME), 1 UNION ALL SELECT CAST('10:12:34' AS TIME), 2");
            assertQuery(format("SELECT y FROM %s WHERE x = time '10:12:34'", tableName), "values 12345, 54321");
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    @Test
    public void testReadEmptyTable()
    {
        assertUpdate("CREATE TABLE test_read_empty (a bigint, b double, c varchar)");
        assertTrue(getQueryRunner().tableExists(getSession(), "test_read_empty"));
        assertQuery("SELECT * FROM test_read_empty", "SELECT * FROM region WHERE name='not_exist'");

        assertUpdate("DROP TABLE test_read_empty");
        assertFalse(getQueryRunner().tableExists(getSession(), "test_create_original"));
    }

    @Test
    public void testBasicTableStatistics()
    {
        Session session = getSession();
        String tableName = "test_basic_table_statistics";
        assertUpdate(format("CREATE TABLE %s (col REAL)", tableName));

        assertQuery(session, "SHOW STATS FOR " + tableName,
                "VALUES " +
                        "  ('col', null, null, null, NULL, NULL, NULL, NULL), " +
                        "  (NULL, NULL, NULL, NULL, 0e0, NULL, NULL, NULL)");

        assertUpdate("INSERT INTO " + tableName + " VALUES -10", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES 100", 1);

        assertQuery(session, "SHOW STATS FOR " + tableName,
                "VALUES " +
                        "  ('col', NULL, NULL, 0.0, NULL, '-10.0', '100.0', NULL), " +
                        "  (NULL, NULL, NULL, NULL, 2e0, NULL, NULL, NULL)");
        assertUpdate("INSERT INTO " + tableName + " VALUES 200", 1);
        assertQuery(session, "SHOW STATS FOR " + tableName,
                "VALUES " +
                        "  ('col', NULL, NULL, 0.0, NULL, '-10.0', '200.0', NULL), " +
                        "  (NULL, NULL, NULL, NULL, 3e0, NULL, NULL, NULL)");

        dropTable(session, tableName);
    }

    @Test
    public void testPartitionedByRealWithNaNInf()
    {
        Session session = getSession();
        String tableName = "test_partitioned_by_real";

        assertUpdate("CREATE TABLE " + tableName + " (id integer, part real) WITH(partitioning = ARRAY['part'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, real 'NaN')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, real 'Infinity')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, real '-Infinity')", 1);

        assertQuery("SELECT part FROM " + tableName + " WHERE id = 1", "VALUES cast('NaN' as real)");
        assertQuery("SELECT part FROM " + tableName + " WHERE id = 2", "VALUES cast('Infinity' as real)");
        assertQuery("SELECT part FROM " + tableName + " WHERE id = 3", "VALUES cast('-Infinity' as real)");

        assertQuery("SELECT id FROM " + tableName + " WHERE is_nan(part)", "VALUES 1");
        assertQuery("SELECT id FROM " + tableName + " WHERE is_infinite(part) ORDER BY id", "VALUES (2),(3)");

        dropTable(session, tableName);
    }

    @Test
    public void testPartitionedByDoubleWithNaNInf()
    {
        Session session = getSession();
        String tableName = "test_partitioned_by_double";

        assertUpdate("CREATE TABLE " + tableName + " (id integer, part double) WITH(partitioning = ARRAY['part'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, double 'NaN')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, double 'Infinity')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, double '-Infinity')", 1);

        assertQuery("SELECT part FROM " + tableName + " WHERE id = 1", "VALUES cast('NaN' as double)");
        assertQuery("SELECT part FROM " + tableName + " WHERE id = 2", "VALUES cast('Infinity' as double)");
        assertQuery("SELECT part FROM " + tableName + " WHERE id = 3", "VALUES cast('-Infinity' as double)");

        assertQuery("SELECT id FROM " + tableName + " WHERE is_nan(part)", "VALUES 1");
        assertQuery("SELECT id FROM " + tableName + " WHERE is_infinite(part) ORDER BY id", "VALUES (2),(3)");

        dropTable(session, tableName);
    }

    @Test
    public void testFilterBySubfieldOfRowType()
    {
        Session session = getSession();
        String tableName = "test_filter_by_subfieldofrow";

        assertUpdate("CREATE TABLE " + tableName + " (id integer, r row(a integer, b varchar))");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, (1, '1001'))", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, (2, '1002'))", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, (3, '1003'))", 1);

        assertQuery("SELECT * FROM " + tableName + " WHERE r.a = 1", "VALUES (1, (1, '1001'))");
        assertQuery("SELECT * FROM " + tableName + " WHERE r.b = '1003'", "VALUES (3, (3, '1003'))");
        assertQuery("SELECT * FROM " + tableName + " WHERE r.a > 1 and r.b < '1003'", "VALUES (2, (2, '1002'))");
        dropTable(session, tableName);
    }

    protected String getLocation(String schema, String table)
    {
        return null;
    }

    protected Path getCatalogDirectory()
    {
        java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
        return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
    }

    protected Table getIcebergTable(ConnectorSession session, String namespace, String tableName)
    {
        return null;
    }

    protected void createTableWithMergeOnRead(Session session, String schema, String tableName)
    {
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer) WITH (\"format-version\" = '2')");

        CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager();
        ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId();

        Table icebergTable = getIcebergTable(session.toConnectorSession(connectorId), schema, tableName);

        UpdateProperties updateProperties = icebergTable.updateProperties();
        updateProperties.set("write.merge.mode", "merge-on-read");
        updateProperties.commit();
    }

    protected void cleanupTableWithMergeOnRead(String tableName)
    {
        Session session = getSession();
        dropTable(session, tableName);
    }

    @Test
    public void testMergeOnReadEnabled()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "test_merge_on_read_enabled";
        try {
            Session session = getSession();

            createTableWithMergeOnRead(session, schemaName, tableName);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, 1)", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, 2)", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, 1), (2, 2)");
        }
        finally {
            cleanupTableWithMergeOnRead(tableName);
        }
    }

    @Test
    public void testMergeOnReadDisabled()
    {
        String tableName = "test_merge_on_read_disabled";
        @Language("RegExp") String errorMessage = "merge-on-read table mode not supported yet";
        try {
            Session session = Session.builder(getSession())
                    .setCatalogSessionProperty(ICEBERG_CATALOG, "merge_on_read_enabled", "false")
                    .build();

            createTableWithMergeOnRead(session, session.getSchema().get(), tableName);
            assertQueryFails(session, "INSERT INTO " + tableName + " VALUES (1, 1)", errorMessage);
            assertQueryFails(session, "INSERT INTO " + tableName + " VALUES (2, 2)", errorMessage);
            assertQueryFails(session, "SELECT * FROM " + tableName, errorMessage);
        }
        finally {
            cleanupTableWithMergeOnRead(tableName);
        }
    }

    @Test
    public void testTableStatisticsTimestamp()
    {
        Session session = Session.builder(getSession())
                .setTimeZoneKey(UTC_KEY)
                .build();
        String tableName = "test_table_statistics_timestamp";
        assertUpdate(session, format("CREATE TABLE %s (col TIMESTAMP)", tableName));

        assertQuery(session, "SHOW STATS FOR " + tableName,
                "VALUES " +
                        "  ('col', null, null, null, NULL, NULL, NULL, NULL), " +
                        "  (NULL, NULL, NULL, NULL, 0e0, NULL, NULL, NULL)");

        assertUpdate(session, "INSERT INTO " + tableName + " VALUES TIMESTAMP '2021-01-02 09:04:05.321'", 1);
        assertUpdate(session, "INSERT INTO " + tableName + " VALUES TIMESTAMP '2022-12-22 10:07:08.456'", 1);

        assertQuery(session, "SHOW STATS FOR " + tableName,
                "VALUES " +
                        "  ('col', NULL, NULL, 0.0, NULL, '2021-01-02 09:04:05.321', '2022-12-22 10:07:08.456', NULL), " +
                        "  (NULL, NULL, NULL, NULL, 2e0, NULL, NULL, NULL)");
        dropTable(session, tableName);
    }

    @Test
    public void testDatePartitionedByYear()
    {
        Session session = getSession();
        String tableName = "test_date_partitioned_by_year";

        assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 date) WITH(partitioning = ARRAY['year(c2)'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, date '2022-10-01')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, date '2023-11-02')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, date '1980-01-01'), (4, date '1990-02-02')", 2);

        assertQuery("SELECT c2_year, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_year",
                "VALUES (10, 1, 1), (20, 1, 1), (52, 1, 1), (53, 1, 1)");
        assertQuery("SELECT * FROM " + tableName + " WHERE year(c2) = 2023", "VALUES (2, '2023-11-02')");

        dropTable(session, tableName);
    }

    @Test
    public void testDatePartitionedByMonth()
    {
        Session session = getSession();
        String tableName = "test_date_partitioned_by_month";

        assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 date) WITH(partitioning = ARRAY['month(c2)'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, date '2022-01-01')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, date '2023-11-02')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, date '1970-02-02'), (4, date '1971-01-02')", 2);

        assertQuery("SELECT c2_month, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_month",
                "VALUES (1, 1, 1), (12, 1, 1), (624, 1, 1), (646, 1, 1)");
        assertQuery("SELECT * FROM " + tableName + " WHERE month(c2) = 11", "VALUES (2, '2023-11-02')");

        dropTable(session, tableName);
    }

    @Test
    public void testDatePartitionedByDay()
    {
        Session session = getSession();
        String tableName = "test_date_partitioned_by_day";

        assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 date) WITH(partitioning = ARRAY['day(c2)'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, date '2022-10-01')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, date '2023-11-02')", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, date '1970-10-01'), (4, date '1971-11-05')", 2);

        assertQuery("SELECT c2_day, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_day",
                "VALUES ('1970-10-01', 1, 1), ('1971-11-05', 1, 1), ('2022-10-01', 1, 1), ('2023-11-02', 1, 1)");
        assertQuery("SELECT * FROM " + tableName + " WHERE day(c2) = 2", "VALUES (2, '2023-11-02')");

        dropTable(session, tableName);
    }

    @DataProvider(name = "timezones")
    public Object[][] timezones()
    {
        return new Object[][] {
                {"UTC", true},
                {"America/Los_Angeles", true},
                {"Asia/Shanghai", true},
                {"None", false}};
    }

    @Test(dataProvider = "timezones")
    public void testTimestampPartitionedByYear(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tableName = "test_timestamp_partitioned_by_year";

        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 timestamp) WITH(partitioning = ARRAY['year(c2)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, timestamp '2022-10-01 00:00:00.000')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, timestamp '2023-11-02 12:10:31.315')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (3, timestamp '1980-01-01 12:10:31.315'), (4, timestamp '1990-01-01 12:10:31.315')", 2);

            assertQuery(session, "SELECT c2_year, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_year",
                    "VALUES (10, 1, 1), (20, 1, 1), (52, 1, 1), (53, 1, 1)");
            assertQuery(session, "SELECT * FROM " + tableName + " WHERE year(c2) = 2023", "VALUES (2, '2023-11-02 12:10:31.315')");
        }
        finally {
            dropTable(session, tableName);
        }
    }

    @Test(dataProvider = "timezones")
    public void testTimestampPartitionedByMonth(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tableName = "test_timestamp_partitioned_by_month";

        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 timestamp) WITH(partitioning = ARRAY['month(c2)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, timestamp '2022-10-01 00:00:00.000')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, timestamp '2023-11-02 12:10:31.315')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (3, timestamp '1970-02-02 12:10:31.315'), (4, timestamp '1971-01-02 12:10:31.315')", 2);

            assertQuery(session, "SELECT c2_month, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_month",
                    "VALUES (1, 1, 1), (12, 1, 1), (633, 1, 1), (646, 1, 1)");
            assertQuery(session, "SELECT * FROM " + tableName + " WHERE month(c2) = 11", "VALUES (2, '2023-11-02 12:10:31.315')");
        }
        finally {
            dropTable(session, tableName);
        }
    }

    @Test(dataProvider = "timezones")
    public void testTimestampPartitionedByDay(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tableName = "test_timestamp_partitioned_by_day";

        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 timestamp) WITH(partitioning = ARRAY['day(c2)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, timestamp '2022-10-01 00:00:00.000')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, timestamp '2023-11-02 12:10:31.315')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (3, timestamp '1970-01-05 00:00:00.000'), (4, timestamp '1971-01-10 12:10:31.315')", 2);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (5, TIMESTAMP '1969-12-30 18:47:33.345')," +
                    " (6, TIMESTAMP '1969-12-31 00:00:00.000')," +
                    " (7, TIMESTAMP '1969-12-31 05:06:07.234')", 3);

            assertQuery(session, "SELECT c2_day, row_count, file_count, c2.min, c2.max FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_day",
                    "VALUES ('1970-01-05', 1, 1, timestamp '1970-01-05 00:00:00.000', timestamp '1970-01-05 00:00:00.000'), " +
                            "('1971-01-10', 1, 1, timestamp '1971-01-10 12:10:31.315', timestamp '1971-01-10 12:10:31.315'), " +
                            "('2022-10-01', 1, 1, timestamp '2022-10-01 00:00:00.000', timestamp '2022-10-01 00:00:00.000'), " +
                            "('1969-12-30', 1, 1, timestamp '1969-12-30 18:47:33.345', timestamp '1969-12-30 18:47:33.345'), " +
                            "('1969-12-31', 2, 1, timestamp '1969-12-31 00:00:00.000', timestamp '1969-12-31 05:06:07.234'), " +
                            "('2023-11-02', 1, 1, timestamp '2023-11-02 12:10:31.315', timestamp '2023-11-02 12:10:31.315')");
            assertQuery(session, "SELECT * FROM " + tableName + " WHERE day(c2) = 2", "VALUES (2, '2023-11-02 12:10:31.315')");

            assertQuery(session,
                    "SELECT * FROM " + tableName + " WHERE c2 = TIMESTAMP '1969-12-31 05:06:07.234'",
                    "VALUES (7, TIMESTAMP '1969-12-31 05:06:07.234')");

            assertQuery(session,
                    "SELECT * FROM " + tableName + " WHERE CAST(c2 as DATE) = DATE '1969-12-31'",
                    "VALUES (6, TIMESTAMP '1969-12-31 00:00:00.000'), (7, TIMESTAMP '1969-12-31 05:06:07.234')");
        }
        finally {
            dropTable(session, tableName);
        }
    }

    @Test(dataProvider = "timezones")
    public void testTimestampPartitionedByHour(String zoneId, boolean legacyTimestamp)
    {
        Session session = sessionForTimezone(zoneId, legacyTimestamp);
        String tableName = "test_timestamp_partitioned_by_hour";

        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 timestamp) WITH(partitioning = ARRAY['hour(c2)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, timestamp '2022-10-01 10:00:00.000')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, timestamp '2023-11-02 12:10:31.315')", 1);
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (3, timestamp '1970-01-01 10:10:31.315'), (4, timestamp '1970-01-02 10:10:31.315')", 2);

            assertQuery(session, "SELECT c2_hour, row_count, file_count FROM " + "\"" + tableName + "$partitions\" ORDER BY c2_hour",
                    "VALUES (10, 1, 1), (34, 1, 1), (462394, 1, 1), (471924, 1, 1)");
            assertQuery(session, "SELECT * FROM " + tableName + " WHERE hour(c2) = 12", "VALUES (2, '2023-11-02 12:10:31.315')");
        }
        finally {
            dropTable(session, tableName);
        }
    }

    @Test
    public void testPartitionTransformOnTimestampWithTimeZone()
    {
        //TODO: Not yet support identity transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_identity_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['col_timestamp_tz'])");
        assertQueryFails("insert into test_identity_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Type not supported as partition column: timestamp with time zone");
        assertUpdate("drop table if exists test_identity_transform_timestamp_tz");

        //TODO: Not yet support bucket transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_bucket_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['bucket(col_timestamp_tz, 2)'])");
        assertQueryFails("insert into test_bucket_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Unsupported type for 'bucket': 1000: col_timestamp_tz_bucket: bucket\\[2\\]\\(1\\)");
        assertUpdate("drop table if exists test_bucket_transform_timestamp_tz");

        //TODO: Not yet support year transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_year_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['year(col_timestamp_tz)'])");
        assertQueryFails("insert into test_year_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Unsupported type for 'year': 1000: col_timestamp_tz_year: year\\(1\\)");
        assertUpdate("drop table if exists test_year_transform_timestamp_tz");

        //TODO: Not yet support month transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_month_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['month(col_timestamp_tz)'])");
        assertQueryFails("insert into test_month_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Unsupported type for 'month': 1000: col_timestamp_tz_month: month\\(1\\)");
        assertUpdate("drop table if exists test_month_transform_timestamp_tz");

        //TODO: Not yet support day transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_day_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['day(col_timestamp_tz)'])");
        assertQueryFails("insert into test_day_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Unsupported type for 'day': 1000: col_timestamp_tz_day: day\\(1\\)");
        assertUpdate("drop table if exists test_day_transform_timestamp_tz");

        //TODO: Not yet support hour transform for timestamp with time zone, which was supported by Iceberg
        assertUpdate("create table test_hour_transform_timestamp_tz(col_timestamp_tz timestamp with time zone)" +
                "with (partitioning = ARRAY['hour(col_timestamp_tz)'])");
        assertQueryFails("insert into test_hour_transform_timestamp_tz " +
                        "values(CAST('2023-01-01 00:00:00.000 UTC' AS TIMESTAMP WITH TIME ZONE))",
                "Unsupported type for 'hour': 1000: col_timestamp_tz_hour: hour\\(1\\)");
        assertUpdate("drop table if exists test_hour_transform_timestamp_tz");
    }

    @Test
    public void testPartitionTransformOnUUID()
    {
        //TODO: Not yet support identity transform for uuid, which was supported by Iceberg
        assertUpdate("create table test_identity_transform_uuid(col_uuid uuid)" +
                "with (partitioning = ARRAY['col_uuid'])");
        assertQueryFails("insert into test_identity_transform_uuid " +
                        "values(cast ('d2177dd0-eaa2-11de-a572-001b779c76e1' as uuid))",
                "Type not supported as partition column: uuid");
        assertUpdate("drop table if exists test_identity_transform_uuid");

        //TODO: Not yet support bucket transform for uuid, which was supported by Iceberg
        assertUpdate("create table test_bucket_transform_uuid(col_uuid uuid)" +
                "with (partitioning = ARRAY['bucket(col_uuid, 2)'])");
        assertQueryFails("insert into test_bucket_transform_uuid " +
                        "values(cast ('d2177dd0-eaa2-11de-a572-001b779c76e1' as uuid))",
                "Unsupported type for 'bucket': 1000: col_uuid_bucket: bucket\\[2\\]\\(1\\)");
        assertUpdate("drop table if exists test_bucket_transform_uuid");
    }

    @Test
    public void testBucketTransformOnTime()
    {
        testWithAllFileFormats(this::testBucketTransformsOnTimeForFormat);
    }

    private void testBucketTransformsOnTimeForFormat(Session session, FileFormat format)
    {
        String select = "SELECT a_bucket, row_count, a.min AS a_min, a.max AS a_max, b.min AS b_min, b.max AS b_max FROM \"test_bucket_transform_on_time$partitions\"";

        assertUpdate(session, format("CREATE TABLE test_bucket_transform_on_time (a TIME, b BIGINT)" +
                " WITH (\"write.format.default\" = '%s', partitioning = ARRAY['bucket(a, 4)'])", format.name()));
        String insertSql = "INSERT INTO test_bucket_transform_on_time VALUES" +
                "(time '01:02:03.123', 1)," +
                "(time '21:22:50.002', 2)," +
                "(time '12:13:14.345', 3)," +
                "(time '00:00:01.001', 4)," +
                "(time '23:23:59.999', 5)," +
                "(time '00:00:00.000', 6)," +
                "(time '07:31:55.425', 7)";
        assertUpdate(session, insertSql, 7);

        assertQuery(session, "SELECT COUNT(*) FROM \"test_bucket_transform_on_time$partitions\"", "SELECT 4");

        assertQuery(session, select + " WHERE a_bucket = 0", "VALUES(0, 2, time '00:00:00.000', time '12:13:14.345', 3, 6)");
        assertQuery(session, select + " WHERE a_bucket = 1", "VALUES(1, 1, time '23:23:59.999', time '23:23:59.999', 5, 5)");
        assertQuery(session, select + " WHERE a_bucket = 2", "VALUES(2, 1, time '21:22:50.002', time '21:22:50.002', 2, 2)");
        assertQuery(session, select + " WHERE a_bucket = 3", "VALUES(3, 3, time '00:00:01.001', time '07:31:55.425', 1, 7)");

        assertQuery(session, "select * from test_bucket_transform_on_time where a = time '01:02:03.123'",
                "VALUES(time '01:02:03.123', 1)");
        assertQuery(session, "select * from test_bucket_transform_on_time where a > time '01:02:03.123' and a <= time '12:13:14.345'",
                "VALUES(time '07:31:55.425', 7), (time '12:13:14.345', 3)");
        assertQuery(session, "select * from test_bucket_transform_on_time where a in (time '00:00:01.001', time '21:22:50.002')",
                "VALUES(time '00:00:01.001', 4), (time '21:22:50.002', 2)");
        dropTable(session, "test_bucket_transform_on_time");
    }

    @Test
    public void testBucketTransformOnTimestamp()
    {
        //TODO: Not yet support bucket transform for timestamp, which was supported by Iceberg
        assertUpdate("create table test_bucket_transform_timestamp(col_timestamp timestamp)" +
                "with (partitioning = ARRAY['bucket(col_timestamp, 2)'])");
        assertQueryFails("insert into test_bucket_transform_timestamp values(timestamp '1984-01-08 01:02:03.123')",
                "Unsupported type for 'bucket': 1000: col_timestamp_bucket: bucket\\[2\\]\\(1\\)");
        assertUpdate("drop table if exists test_bucket_transform_timestamp");
    }

    @Test
    public void testRegisterTable()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "register";
        // Create a `noise` table in the same schema to test that the `getLocation` method finds and returns the right metadata location.
        String noiseTableName = "register1";
        assertUpdate("CREATE TABLE " + noiseTableName + " (id integer, value integer)");
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)");
        assertUpdate("INSERT INTO " + tableName + " VALUES(1, 1)", 1);

        String metadataLocation = getLocation(schemaName, tableName);

        String newTableName = tableName + "_new";
        assertUpdate("CALL system.register_table('" + schemaName + "', '" + newTableName + "', '" + metadataLocation + "')");
        assertQuery("SELECT * FROM " + newTableName, "VALUES (1, 1)");

        unregisterTable(schemaName, newTableName);
        dropTable(getSession(), tableName);
        dropTable(getSession(), noiseTableName);
    }

    @Test
    public void testRegisterTableAndInsert()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "register_insert";
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)");
        assertUpdate("INSERT INTO " + tableName + " VALUES(1, 1)", 1);

        String metadataLocation = getLocation(schemaName, tableName);

        String newTableName = tableName + "_new";
        assertUpdate("CALL system.register_table('" + schemaName + "', '" + newTableName + "', '" + metadataLocation + "')");
        assertUpdate("INSERT INTO " + newTableName + " VALUES(2, 2)", 1);
        assertQuery("SELECT * FROM " + newTableName, "VALUES (1, 1), (2, 2)");

        assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1)");

        unregisterTable(schemaName, newTableName);
        dropTable(getSession(), tableName);
    }

    @Test
    public void testRegisterTableWithFileName()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "register_filename";
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)");
        assertUpdate("INSERT INTO " + tableName + " VALUES(1, 1)", 1);

        String metadataLocation = getLocation(schemaName, tableName);
        String metadataFileName = getMetadataFileLocation(getSession().toConnectorSession(), getHdfsEnvironment(), schemaName, tableName, metadataLocation);

        // Register new table with procedure
        String newTableName = tableName + "_new";
        assertUpdate("CALL system.register_table('" + schemaName + "', '" + newTableName + "', '" + metadataLocation + "', '" + metadataFileName + "')");
        assertQuery("SELECT * FROM " + newTableName, "VALUES (1, 1)");

        unregisterTable(schemaName, newTableName);
        dropTable(getSession(), tableName);
    }

    @Test
    public void testRegisterTableWithInvalidLocation()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "register_invalid";
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)");
        assertUpdate("INSERT INTO " + tableName + " VALUES(1, 1)", 1);

        String metadataLocation = getLocation(schemaName, tableName) + "_invalid";

        @Language("RegExp") String errorMessage = format("Unable to find metadata at location %s/%s", metadataLocation, METADATA_FOLDER_NAME);
        assertQueryFails("CALL system.register_table ('" + schemaName + "', '" + tableName + "', '" + metadataLocation + "')", errorMessage);

        dropTable(getSession(), tableName);
    }

    @Test
    public void testUnregisterTable()
    {
        String schemaName = getSession().getSchema().get();
        String tableName = "unregister";
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)");

        // Unregister table with procedure
        assertUpdate("CALL system.unregister_table('" + schemaName + "', '" + tableName + "')");
        assertFalse(getQueryRunner().tableExists(getSession(), tableName));
    }

    @Test
    public void testColumnNameWithSpace()
    {
        Session session = getSession();
        String tableName = "test_column_name_with_space";
        String columnName = "column a";
        assertUpdate(format("CREATE TABLE %s (\"%s\" int)", tableName, columnName));
        assertUpdate(format("INSERT INTO %s VALUES (123), (456)", tableName), 2);
        assertQuery(format("SELECT \"%s\" FROM %s", columnName, tableName), "VALUES (123), (456)");
        dropTable(session, tableName);
    }

    @Test
    public void testDeleteUnPartitionedTable()
    {
        Session session = getSession();

        // Test with default delete_mode i.e. merge-on-read
        String tableNameMor = "test_delete_mor";

        assertUpdate("CREATE TABLE " + tableNameMor + " (id integer, value integer) WITH (\"format-version\" = '2')");
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (1, 10)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (2, 13)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (3, 5)", 1);
        assertQuery("SELECT * FROM " + tableNameMor, "VALUES (1, 10), (2, 13), (3, 5)");
        assertUpdate("DELETE FROM " + tableNameMor + " WHERE id = 1", 1);
        assertQuery("SELECT * FROM " + tableNameMor, "VALUES (2, 13), (3, 5)");

        dropTable(session, tableNameMor);

        // Test with delete_mode set to copy-on-write
        String tableNameCow = "test_delete_cow";
        @Language("RegExp") String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions.";

        assertUpdate("CREATE TABLE " + tableNameCow + " (id integer, value integer) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'copy-on-write')");
        assertUpdate("INSERT INTO " + tableNameCow + " VALUES (1, 5)", 1);
        assertQuery("SELECT * FROM " + tableNameCow, "VALUES (1, 5)");
        assertQueryFails("DELETE FROM " + tableNameCow + " WHERE value = 5", errorMessage);

        dropTable(session, tableNameCow);
    }

    @Test
    public void testDeletePartitionedTable()
    {
        Session session = getSession();

        // Test with default delete_mode i.e. merge-on-read:
        String tableNameMor = "test_delete_partitioned_mor";

        assertUpdate("CREATE TABLE " + tableNameMor + " (id integer, value integer) WITH (\"format-version\" = '2', partitioning = Array['id'])");
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (1, 10)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (2, 13)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (3, 5)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (4, 13)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (5, 1)", 1);
        assertUpdate("INSERT INTO " + tableNameMor + " VALUES (6, 1)", 1);
        assertQuery("SELECT * FROM " + tableNameMor, "VALUES (1, 10), (2, 13), (3, 5), (4, 13), (5, 1), (6, 1)");

        // 1. delete by partitioning column
        // 2. delete by non-partitioning column
        assertUpdate("DELETE FROM " + tableNameMor + " WHERE id = 1", 1);
        assertQuery("SELECT * FROM " + tableNameMor, "VALUES (2, 13), (3, 5), (4, 13), (5, 1), (6, 1)");
        assertUpdate("DELETE FROM " + tableNameMor + " WHERE value = 13", 2);
        assertQuery("SELECT * FROM " + tableNameMor, "VALUES (3, 5), (5, 1), (6, 1)");

        dropTable(session, tableNameMor);

        // Test with delete_mode set to copy-on-write
        String tableNameCow = "test_delete_partitioned_cow";
        @Language("RegExp") String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions.";

        assertUpdate("CREATE TABLE " + tableNameCow + " (id integer, value integer) WITH (\"format-version\" = '2', partitioning = Array['id'], \"write.delete.mode\" = 'copy-on-write')");
        assertUpdate("INSERT INTO " + tableNameCow + " VALUES (1, 10)", 1);
        assertUpdate("INSERT INTO " + tableNameCow + " VALUES (2, 1)", 1);
        assertUpdate("INSERT INTO " + tableNameCow + " VALUES (3, 5)", 1);
        assertQuery("SELECT * FROM " + tableNameCow, "VALUES (1, 10), (2, 1), (3, 5)");

        // 1. delete by partitioning column
        // 2. delete by non-partitioning column
        assertUpdate(session, "DELETE FROM " + tableNameCow + " WHERE id = 3", 1);
        assertQuery(session, "SELECT * FROM " + tableNameCow, "VALUES (1, 10), (2, 1)");
        assertQueryFails(session, "DELETE FROM " + tableNameCow + " WHERE value = 1", errorMessage);

        dropTable(session, tableNameCow);
    }

    @Test
    public void testDeleteOnTableWithPartitionEvolution()
    {
        String tableName = "test_delete_on_table_with_partition_evolution";

        Session session = getSession();

        assertUpdate("CREATE TABLE " + tableName + " (a integer, b varchar) WITH (\"format-version\" = '2')");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
        assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001'), (2, '1002'), (3, '1003')");
        assertUpdate("DELETE FROM " + tableName + " WHERE b = '1001'", 1);
        assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002'), (3, '1003')");

        assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c integer WITH (partitioning = 'identity')");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 1), (3, '1003', 2)", 3);
        assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', NULL), (1, '1001', 1), (2, '1002', 1), (3, '1003', 2)");

        assertUpdate("DELETE FROM " + tableName + " WHERE b = '1002'", 2);
        assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', NULL), (1, '1001', 1), (3, '1003', 2)");

        assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN d integer WITH (partitioning = 'identity')");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1, 1), (2, '1002', 1, 2), (3, '1003', 2, 1)", 3);
        assertUpdate("DELETE FROM " + tableName + " WHERE b = '1003'", 3);
        assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', 1, NULL), (1, '1001', 1, 1), (2, '1002', 1, 2)");

        dropTable(session, tableName);
    }

    @Test
    public void testDeleteOnPartitionedV1Table()
    {
        String tableName = "test_delete_on_partitioned_v1_table";

        Session session = getSession();

        String errorMessage = format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE);
        assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer) WITH (\"format-version\" = '1', partitioning = Array['id'])");
        assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10)", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (2, 1)", 1);
        assertUpdate("INSERT INTO " + tableName + " VALUES (3, 5)", 1);
        assertQuery("SELECT * FROM " + tableName, "VALUES (1, 10), (2, 1), (3, 5)");

        // 1. delete by partitioning column
        // 2. delete by non-partitioning column
        assertUpdate(session, "DELETE FROM " + tableName + " WHERE id = 3", 1);
        assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, 10), (2, 1)");
        assertQueryFails(session, "DELETE FROM " + tableName + " WHERE value = 1", errorMessage);

        dropTable(session, tableName);
    }

    @Test(dataProvider = "version_and_mode")
    public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode)
    {
        String tableName = "test_empty_partition_spec_table";
        try {
            // Create a table with no partition
            assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "')");

            // Do not insert data, and evaluate the partition spec by adding a partition column `c`
            assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");

            // Insert data under the new partition spec
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);

            // We can do metadata delete on partition column `c`, because the initial partition spec contains no data
            assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
            assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    @Test(dataProvider = "version_and_mode")
    public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode)
    {
        String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
        String tableName = "test_data_deleted_partition_spec_table";
        try {
            // Create a table with partition column `a`, and insert some data under this partition spec
            assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['a'])");
            assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

            // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
            assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
            assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

            // Do not support metadata delete with filter on column `c`, because we have data with old partition spec
            assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

            // Do metadata delete on column `a`, because all partition specs contains partition column `a`
            assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);

            // Then we can do metadata delete on column `c`, because the old partition spec contains no data now
            assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
            assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    @DataProvider(name = "version_and_mode")
    public Object[][] versionAndMode()
    {
        return new Object[][] {
                {"1", "copy-on-write"},
                {"1", "merge-on-read"},
                {"2", "copy-on-write"}};
    }

    @Test(dataProvider = "version_and_mode")
    public void testMetadataDeleteOnNonIdentityPartitionColumn(String version, String mode)
    {
        String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
        metadataDeleteOnHourTransform(version, mode, errorMessage);
        metadataDeleteOnDayTransform(version, mode, errorMessage);
        metadataDeleteOnMonthTransform(version, mode, errorMessage);
        metadataDeleteOnYearTransform(version, mode, errorMessage);
        metadataDeleteOnTruncateTransform(version, mode, errorMessage);
    }

    private void metadataDeleteOnHourTransform(String version, String mode, String errorMessage)
    {
        Session session = sessionForTimezone("UTC", true);
        String tableName = "test_hour_transform_timestamp";
        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (d TIMESTAMP, b BIGINT) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['hour(d)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (NULL, 101), (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)", 4);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (NULL, 101), (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)");

            assertUpdate(session, "delete from " + tableName + " where d is NULL", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)");
            assertUpdate(session, "delete from " + tableName + " where d >= TIMESTAMP '1970-01-01 00:00:00'", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11)");
            assertUpdate(session, "delete from " + tableName + " where d >= DATE '1969-12-31'", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10)");
            assertQueryFails(session, "DELETE FROM " + tableName + " WHERE d >= TIMESTAMP '1968-05-15 03:00:00.001'", errorMessage);
        }
        finally {
            dropTable(session, tableName);
        }
    }

    public void metadataDeleteOnDayTransform(String version, String mode, String errorMessage)
    {
        Session session = sessionForTimezone("UTC", true);
        String tableName = "test_day_transform_timestamp";
        try {
            assertUpdate(session, "CREATE TABLE " + tableName + " (d TIMESTAMP, b BIGINT) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['day(d)'])");
            assertUpdate(session, "INSERT INTO " + tableName + " VALUES (NULL, 101), (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)", 4);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (NULL, 101), (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)");

            assertUpdate(session, "delete from " + tableName + " where d is null", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11), (TIMESTAMP '1970-01-01 08:10:21.000', 1)");
            assertUpdate(session, "delete from " + tableName + " where d >= TIMESTAMP '1970-01-01 00:00:00'", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10), (TIMESTAMP '1969-12-31 13:14:02.001', 11)");
            assertUpdate(session, "delete from " + tableName + " where d >= date '1969-12-31'", 1);
            assertQuery(session, "SELECT * FROM " + tableName, "VALUES (TIMESTAMP '1969-01-01 00:01:02.123', 10)");
            assertQueryFails(session, "DELETE FROM " + tableName + " WHERE d >= TIMESTAMP '1968-05-15 00:00:00.001'", errorMessage);
        }
        finally {
            dropTable(session, tableName);
        }
    }

    public void metadataDeleteOnMonthTransform(String version, String mode, String errorMessage)
    {
        String tableName = "test_month_transform_date";
        try {
            assertUpdate("CREATE TABLE " + tableName + " (d DATE, b BIGINT) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['month(d)'])");
            assertUpdate("INSERT INTO " + tableName + " VALUES (NULL, 101), (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)", 4);
            assertQuery("SELECT * FROM " + tableName, "VALUES (NULL, 101), (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)");

            assertUpdate("delete from " + tableName + " where d is null", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)");
            assertUpdate("delete from " + tableName + " where d >= DATE '1970-03-01'", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10), (DATE '1969-08-31', 11)");
            assertUpdate("delete from " + tableName + " where d >= date '1969-08-01'", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10)");
            assertQueryFails("DELETE FROM " + tableName + " WHERE d >= date '1958-02-02'", errorMessage);
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    public void metadataDeleteOnYearTransform(String version, String mode, String errorMessage)
    {
        String tableName = "test_year_transform_date";
        try {
            assertUpdate("CREATE TABLE " + tableName + " (d DATE, b BIGINT) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['year(d)'])");
            assertUpdate("INSERT INTO " + tableName + " VALUES (NULL, 101), (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)", 4);
            assertQuery("SELECT * FROM " + tableName, "VALUES (NULL, 101), (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)");

            assertUpdate("delete from " + tableName + " where d is null", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10), (DATE '1969-08-31', 11), (DATE '1970-08-01', 1)");
            assertUpdate("delete from " + tableName + " where d >= DATE '1970-01-01'", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10), (DATE '1969-08-31', 11)");
            assertUpdate("delete from " + tableName + " where d >= date '1968-01-01'", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (DATE '1958-03-02', 10)");
            assertQueryFails("DELETE FROM " + tableName + " WHERE d >= date '1957-02-01'", errorMessage);
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    public void metadataDeleteOnTruncateTransform(String version, String mode, String errorMessage)
    {
        String tableName = "test_truncate_transform";
        try {
            assertUpdate("CREATE TABLE " + tableName + " (c VARCHAR, d DECIMAL(9, 2), b BIGINT) WITH (\"format-version\" = '" + version + "', \"write.delete.mode\" = '" + mode + "', partitioning = ARRAY['truncate(c, 2)', 'truncate(d, 10)'])");
            assertUpdate("INSERT INTO " + tableName + " VALUES (NULL, 11.59, 101), ('abcd', 12.34, 10), ('abxy', NULL, 11), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)", 5);
            assertQuery("SELECT * FROM " + tableName, "VALUES (NULL, 11.59, 101), ('abcd', 12.34, 10), ('abxy', NULL, 11), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)");

            assertUpdate("delete from " + tableName + " where c is null", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES ('abcd', 12.34, 10), ('abxy', NULL, 11), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)");
            assertQueryFails("DELETE FROM " + tableName + " WHERE c >= 'ab'", errorMessage);
            assertUpdate("delete from " + tableName + " where c is not null", 4);
            assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");

            assertUpdate("INSERT INTO " + tableName + " VALUES (NULL, 11.59, 101), ('abcd', 12.34, 10), ('abxy', NULL, 11), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)", 5);
            assertQuery("SELECT * FROM " + tableName, "VALUES (NULL, 11.59, 101), ('abcd', 12.34, 10), ('abxy', NULL, 11), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)");
            assertUpdate("delete from " + tableName + " where d is NULL", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES (NULL, 11.59, 101), ('abcd', 12.34, 10), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)");
            assertUpdate("delete from " + tableName + " where c is null and d is not null", 1);
            assertQuery("SELECT * FROM " + tableName, "VALUES ('abcd', 12.34, 10), ('Kielce', 12.30, 1), ('Kiev', 0.05, 2)");
            assertQueryFails("DELETE FROM " + tableName + " WHERE d >= 12.20", errorMessage);
        }
        finally {
            dropTable(getSession(), tableName);
        }
    }

    private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
    {
        SessionBuilder sessionBuilder = Session.builder(getSession())
                .setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp));
        if (legacyTimestamp) {
            sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId));
        }
        return sessionBuilder.build();
    }

    @Test
    public void testUpdatingInvalidProperty()
    {
        Session session = getSession();
        String tableName = "test_invalid_property_update";
        assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(\"commit.retry.num-retries\" = 4)");
        assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (\"write.format.default\" = 'PARQUET')"))
                .hasMessage("Updating property write.format.default is not supported currently");
        assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testUpdatingRandomProperty()
    {
        Session session = getSession();
        String tableName = "test_random_property_update";
        assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(\"commit.retry.num-retries\" = 4)");
        assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (some_config = 2)"))
                .hasMessage("Catalog 'iceberg' does not support table property 'some_config'");
        assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testUpdatingCommitRetries()
    {
        Session session = getSession();
        String tableName = "test_commit_retries_update";
        assertUpdate(session, "CREATE TABLE " + tableName + " (c1 integer, c2 varchar) WITH(\"commit.retry.num-retries\" = 4)");
        assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'commit.retry.num-retries'", "VALUES 4");
        assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES (\"commit.retry.num-retries\" = 5)");
        assertUpdate("ALTER TABLE IF EXISTS " + tableName + " SET PROPERTIES (\"commit.retry.num-retries\" = 6)");
        assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'commit.retry.num-retries'", "VALUES 6");
        assertUpdate("DROP TABLE " + tableName);
    }

    @Test
    public void testUpdateNonExistentTable()
    {
        assertQuerySucceeds("ALTER TABLE IF EXISTS non_existent_test_table1 SET PROPERTIES (\"commit.retry.num-retries\" = 6)");
        assertQueryFails("ALTER TABLE non_existent_test_table2 SET PROPERTIES (\"commit.retry.num-retries\" = 6)",
                format("Table does not exist: iceberg.%s.non_existent_test_table2", getSession().getSchema().get()));
    }

    @Test
    public void testDeprecatedTablePropertiesCreateTable()
    {
        Map<String, String> deprecatedProperties = ImmutableMap.<String, String>builder()
                .put(FILE_FORMAT_PROPERTY, "'ORC'")
                .put(FORMAT_VERSION, "'1'")
                .put(COMMIT_RETRIES, "1234")
                .put(DELETE_MODE, "'copy-on-write'")
                .put(METADATA_PREVIOUS_VERSIONS_MAX, "567")
                .put(METADATA_DELETE_AFTER_COMMIT, "true")
                .put(METRICS_MAX_INFERRED_COLUMN, "123")
                .build();
        deprecatedProperties.forEach((oldProperty, value) -> {
            Session session = Session.builder(getSession()).build();
            String tableName = "test_deprecated_table_properties_" + oldProperty;
            DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
            MaterializedResult result = queryRunner.execute(session, "CREATE TABLE " + tableName + " (c1 integer) WITH (" + oldProperty + " = " + value + ")");
            assertEquals(result.getWarnings().size(), 1);
            assertTrue(result.getWarnings().stream()
                    .anyMatch(code -> code.getWarningCode().equals(USE_OF_DEPRECATED_TABLE_PROPERTY.toWarningCode())));
            assertUpdate(session, "DROP TABLE " + tableName);
        });
    }

    @Test
    public void testDeprecatedTablePropertiesAlterTable()
    {
        Map<String, String> deprecatedProperties = ImmutableMap.<String, String>builder()
                .put(COMMIT_RETRIES, "1234")
                .build();
        deprecatedProperties.forEach((oldProperty, value) -> {
            Session session = Session.builder(getSession()).build();
            String tableName = "test_deprecated_table_properties_" + oldProperty;
            DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
            assertQuerySucceeds(session, "CREATE TABLE " + tableName + " (c1 integer) WITH (" + oldProperty + " = " + value + ")");
            MaterializedResult result = queryRunner.execute(session, "ALTER TABLE " + tableName + " SET PROPERTIES (" + oldProperty + " = " + value + ")");
            assertEquals(result.getWarnings().size(), 1);
            assertTrue(result.getWarnings().stream()
                    .anyMatch(code -> code.getWarningCode().equals(USE_OF_DEPRECATED_TABLE_PROPERTY.toWarningCode())));
            assertUpdate(session, "DROP TABLE " + tableName);
        });
    }

    protected HdfsEnvironment getHdfsEnvironment()
    {
        HiveClientConfig hiveClientConfig = new HiveClientConfig();
        MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
        HiveS3Config hiveS3Config = new HiveS3Config();
        return IcebergDistributedTestBase.getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config);
    }

    /**
     * Based on the default table properties and their default values, construct a customized map of
     * table properties which applies the specified override table properties
     */
    protected Map<String, String> getCustomizedTableProperties(Map<String, String> overrideProperties)
    {
        Map<String, String> propertiesMap = new HashMap<>();
        propertiesMap.put("write.delete.mode", "'merge-on-read'");
        propertiesMap.put("write.format.default", "'PARQUET'");
        propertiesMap.put("format-version", "'2'");
        propertiesMap.put("write.metadata.delete-after-commit.enabled", "false");
        propertiesMap.put("write.metadata.previous-versions-max", "100");
        propertiesMap.put("write.metadata.metrics.max-inferred-column-defaults", "100");
        propertiesMap.put("write.update.mode", "'merge-on-read'");
        propertiesMap.put("read.split.target-size", "134217728");

        propertiesMap.putAll(overrideProperties);
        return ImmutableMap.copyOf(propertiesMap);
    }

    protected void validatePropertiesForShowCreateTable(String table, Map<String, String> propertyDescriptions)
    {
        String catalog = getSession().getCatalog().get();
        String schema = getSession().getSchema().get();
        validateShowCreateTableInner(catalog, schema, table, Optional.empty(),
                Optional.empty(), propertyDescriptions);
    }

    protected void validatePropertiesForShowCreateTable(String catalog, String schema, String table, Map<String, String> propertyDescriptions)
    {
        validateShowCreateTableInner(catalog, schema, table, Optional.empty(),
                Optional.empty(), propertyDescriptions);
    }

    protected void validateShowCreateTable(String table,
                                           List<ColumnDefinition> columnDefinitions,
                                           Map<String, String> propertyDescriptions)
    {
        String catalog = getSession().getCatalog().get();
        String schema = getSession().getSchema().get();
        validateShowCreateTableInner(catalog, schema, table, Optional.ofNullable(columnDefinitions),
                Optional.empty(), propertyDescriptions);
    }

    protected void validateShowCreateTable(String catalog, String schema, String table,
                                           List<ColumnDefinition> columnDefinitions,
                                           String comment,
                                           Map<String, String> propertyDescriptions)
    {
        validateShowCreateTableInner(catalog, schema, table, Optional.ofNullable(columnDefinitions),
                Optional.ofNullable(comment), propertyDescriptions);
    }

    protected ColumnDefinition columnDefinition(String name, String type)
    {
        return new ColumnDefinition(new Identifier(name, true), type, true, ImmutableList.of(), Optional.empty());
    }

    private void validateShowCreateTableInner(String catalog, String schema, String table,
                                              Optional<List<ColumnDefinition>> columnDefinitions,
                                              Optional<String> commentDescription,
                                              Map<String, String> propertyDescriptions)
    {
        MaterializedResult showCreateTable = computeActual(format("SHOW CREATE TABLE %s.%s.%s", catalog, schema, table));
        String createTableSql = (String) getOnlyElement(showCreateTable.getOnlyColumnAsSet());

        SqlParser parser = new SqlParser();
        parser.createStatement(createTableSql).accept(new AstVisitor<Void, Void>() {
            @Override
            protected Void visitCreateTable(CreateTable node, Void context)
            {
                columnDefinitions.ifPresent(columnDefinitionList -> {
                    ImmutableList.Builder<ColumnDefinition> columnDefinitionsBuilder = ImmutableList.builder();
                    node.getElements().forEach(element -> element.accept(new AstVisitor<Void, Void>() {
                        @Override
                        protected Void visitColumnDefinition(ColumnDefinition node, Void context)
                        {
                            columnDefinitionsBuilder.add(node);
                            return null;
                        }
                    }, null));
                    assertEquals(columnDefinitionList, columnDefinitionsBuilder.build());
                });

                commentDescription.ifPresent(comment -> {
                    assertTrue(node.getComment().isPresent());
                    assertEquals(comment, node.getComment().get());
                });

                ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
                node.getProperties().forEach(property -> {
                    propertiesBuilder.put(property.getName().getValue(), property.getValue().toString());
                });
                assertEquals(propertyDescriptions, propertiesBuilder.build());
                return null;
            }
        }, null);
    }

    private static String getMetadataFileLocation(ConnectorSession session, HdfsEnvironment hdfsEnvironment, String schema, String table, String metadataLocation)
    {
        metadataLocation = stripTrailingSlash(metadataLocation);
        org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metadataLocation, METADATA_FOLDER_NAME);
        FileSystem fileSystem = getFileSystem(session, hdfsEnvironment, new SchemaTableName(schema, table), metadataDir);
        return resolveLatestMetadataLocation(
                session,
                fileSystem,
                metadataDir).getName();
    }
}
